package com.atguigu.flink.datastream.sink;

import com.atguigu.flink.func.ClickSource;
import com.atguigu.flink.pojo.Event;
import org.apache.commons.lang3.RandomUtils;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;

import javax.annotation.Nullable;

/**
 * @author WEIYUNHUI
 * @date 2023/6/14 11:25
 *
 * Kafka Connector:
 *  1. KafkaSource
 *  2. KafkaSink
 *       生产者:  KafkaProducer
 *       生产者配置:
 *          集群位置: bootstrap.servers
 *          Key的序列化器: key.serializer
 *          value的序列化器: value.serializer
 *          缓冲区大小:   buffer.memory
 *          每个分区的缓冲区大小（批次大小）: batch.size
 *          每批次超时时间:linger.ms
 *          应答级别: acks
 *          生产者事务超时时间: transaction.timeout.ms
 *          生产者事务id:  transactional.id
 *
 *       生产者分区分配策略：
 *             默认使用粘性分区分配策略：
 *                1. 如果明确指定了分区号，直接使用
 *                2. 如果没有指定分区号，但是指定了key， 按照key的hash值计算分区号
 *                3. 如果没有指定分区号和key， 直接使用粘性策略。
 *
 *      Kafka提供的生产者的配置类: ProducerConfig
 *
 */
public class Flink03_KafkaSinkWithKey {
    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        env.enableCheckpointing(5000) ;

        DataStreamSource<Event> ds = env.addSource(new ClickSource());
        //将流中的数据写入到Kafka中
        KafkaSink<Event> kafkaSink = KafkaSink.<Event>builder()
                .setBootstrapServers("hadoop102:9092, hadoop103:9092")
                .setRecordSerializer(
                        new KafkaRecordSerializationSchema<Event>() {
                            /**
                             * ProducerRecord：生产者的消息
                             */
                            @Nullable
                            @Override
                            public ProducerRecord<byte[], byte[]> serialize(Event element, KafkaSinkContext context, Long timestamp) {
                                ProducerRecord<byte[], byte[]> producerRecord =
                                        new ProducerRecord<>("topicA", element.getUser().getBytes(), element.toString().getBytes());
                                return producerRecord;
                            }
                        }
                )
                .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
                .setTransactionalIdPrefix("flink" + RandomUtils.nextInt(1, 100))
                .setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG , "36000")
                .build();

        //新的API
        ds.sinkTo(kafkaSink) ;

        try {
            env.execute();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
}
